<%#
 Copyright 2013-2020 the original author or authors from the JHipster project.

 This file is part of the JHipster project, see https://www.jhipster.tech/
 for more information.

 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-%>
package <%= packageName %>.service;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Stream;

import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.r2dbc.core.DatabaseClient;
import org.springframework.data.r2dbc.core.DatabaseClient.GenericInsertSpec;
import org.springframework.data.r2dbc.core.ReactiveDataAccessStrategy;
import org.springframework.data.r2dbc.mapping.OutboundRow;
import org.springframework.data.r2dbc.mapping.SettableValue;
import org.springframework.data.r2dbc.query.UpdateMapper;
import org.springframework.data.relational.core.query.Criteria;
import org.springframework.data.relational.core.mapping.RelationalPersistentEntity;
import org.springframework.data.relational.core.sql.OrderByField;
import org.springframework.data.relational.core.sql.Select;
import org.springframework.data.relational.core.sql.SelectBuilder.SelectFromAndJoin;
import org.springframework.data.relational.core.sql.SelectBuilder.SelectFromAndJoinCondition;
import org.springframework.data.relational.core.sql.SelectBuilder.SelectOrdered;
import org.springframework.data.relational.core.sql.SqlIdentifier;
import org.springframework.data.relational.core.sql.Table;
import org.springframework.data.relational.core.sql.render.SqlRenderer;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * Helper class to create SQL selects based on the entity, paging parameters and criteria.
 * 
 */
@Service
public class EntityManager {
    public final static String ENTITY_ALIAS = "e";
    public final static String ALIAS_PREFIX = "e_";

    public static class LinkTable {
        final String tableName;
        final String idColumn;
        final String referenceColumn;

        public LinkTable(String tableName, String idColumn, String referenceColumn) {
            Assert.notNull(tableName, "tableName is null");
            Assert.notNull(idColumn, "idColumn is null");
            Assert.notNull(referenceColumn, "referenceColumn is null");
            this.tableName = tableName;
            this.idColumn = idColumn;
            this.referenceColumn = referenceColumn;
        }
    }

    private final ReactiveDataAccessStrategy dataAccessStrategy;
    private final SqlRenderer sqlRenderer;
    private final UpdateMapper updateMapper;
    private final DatabaseClient db;

    public EntityManager(ReactiveDataAccessStrategy dataAccessStrategy, SqlRenderer sqlRenderer, UpdateMapper updateMapper, DatabaseClient db) {
        this.dataAccessStrategy = dataAccessStrategy;
        this.sqlRenderer = sqlRenderer;
        this.updateMapper = updateMapper;
        this.db = db;
    }

    /**
     * Creates an SQL select statement from the given fragment and pagination parameters.
     * @param selectFrom a representation of a select statement.
     * @param entityType the entity type which holds the table name.
     * @param pageable page parameter, or null, if everything needs to be returned
     * @return sql select statement
     */
    public String createSelect(SelectFromAndJoin selectFrom, Class<?> entityType, Pageable pageable) {
        if (pageable != null) {
            return createSelectImpl(selectFrom.limitOffset(pageable.getPageSize(), pageable.getOffset()), entityType, pageable.getSort());
        } else {
            return createSelectImpl(selectFrom, entityType, null);
        }
    }

    /**
     * Creates an SQL select statement from the given fragment and pagination parameters.
     * @param selectFrom a representation of a select statement.
     * @param entityType the entity type which holds the table name.
     * @param pageable page parameter, or null, if everything needs to be returned
     * @return sql select statement
     */
    public String createSelect(SelectFromAndJoinCondition selectFrom, Class<?> entityType, Pageable pageable) {
        if (pageable != null) {
            return createSelectImpl(selectFrom.limitOffset(pageable.getPageSize(), pageable.getOffset()), entityType, pageable.getSort());
        } else {
            return createSelectImpl(selectFrom, entityType, null);
        }
    }

    private String createSelectImpl(SelectOrdered selectFrom, Class<?> entityType, Sort sortParameter) {
        final Select select;
        if (sortParameter != null && sortParameter.isSorted()) {
            RelationalPersistentEntity<?> entity = getPersistentEntity(entityType);
            Sort sort = updateMapper.getMappedObject(sortParameter, entity);
            select = selectFrom.orderBy(createOrderByFields(Table.create(entity.getTableName()).as(EntityManager.ENTITY_ALIAS), sort)).build();
        } else {
            select = selectFrom.build();
        }
        return createSelect(select);
    }

    private RelationalPersistentEntity<?> getPersistentEntity(Class<?> entityType) {
        return (RelationalPersistentEntity<?>) dataAccessStrategy.getConverter().getMappingContext()
                .getPersistentEntity(entityType);
    }

    /**
     * Delete all the entity with the given type, and return the number of deletions.
     * @param entityType the entity type which holds the table name.
     * @return the number of deleted entity
     */
    public Mono<Integer> deleteAll(Class<?> entityType) {
        return db.delete().from(entityType).fetch().rowsUpdated();
    }

    /**
     * Delete all the rows from the given table, and return the number of deletions.
     * @param tableName the name of the table to delete.
     * @return the number of deleted rows.
     */
    public Mono<Integer> deleteAll(String tableName) {
        return db.delete().from(tableName).fetch().rowsUpdated();
    }

    /**
     * Generate an actual SQL from the given {@link Select}.
     * @param select a representation of a select statement.
     * @return the generated SQL select.
     */
    public String createSelect(Select select) {
        return sqlRenderer.render(select);
    }

    /**
     * Inserts the given entity into the database - and sets the id, if it's an autoincrement field.
     * @param <S> the type of the persisted entity.
     * @param entity the entity to be inserted into the database.
     * @return the persisted entity.
     */
    public <S> Mono<S> insert(S entity) {
        OutboundRow row = dataAccessStrategy.getOutboundRow(entity);
        RelationalPersistentEntity<?> persistentEntity = getPersistentEntity(entity.getClass());

        GenericInsertSpec<Map<String, Object>> insertSpec = db.insert().into(persistentEntity.getTableName());
        SqlIdentifier idColumn = persistentEntity.getIdColumn();
        for (Entry<SqlIdentifier, SettableValue> column : row.entrySet()) {
            if (column.getValue().hasValue() || !column.getKey().equals(idColumn)) {
                insertSpec = insertSpec.value(column.getKey(), column.getValue());
            }
        }
        return insertSpec.map(dataAccessStrategy.getConverter().populateIdIfNecessary(entity))
            .first()
            .defaultIfEmpty(entity);
    }

    /**
     * Updates the table, which links the entity with the referred entities.
     * @param table describes the link table, it contains a table name, the column name for the id, and for the referred entity id.
     * @param entityId the id of the entity, for which the links are created.
     * @param referencedIds the id of the referred entities.
     * @return the number of inserted rows.
     */
    public Mono<Integer> updateLinkTable(LinkTable table, Long entityId, Stream<Long> referencedIds) {
        return deleteFromLinkTable(table, entityId)
                .then(
                    Flux.fromStream(referencedIds)
                        .flatMap((Long referenceId) ->
                            db.insert()
                                .into(table.tableName)
                                .value(table.idColumn, entityId)
                                .value(table.referenceColumn, referenceId)
                                .fetch()
                                .rowsUpdated()
                                )
                        .collectList()
                        .map((List<Integer> updates) -> updates.stream().reduce(Integer::sum).orElse(0)));
    }

    public Mono<Void> deleteFromLinkTable(LinkTable table, Long entityId) {
        Assert.notNull(entityId, "entityId is null");
        return db.delete()
                .from(table.tableName)
                .matching(Criteria.from(Criteria.where(table.idColumn).is(entityId)))
                .then();
    }

    private static Collection<? extends OrderByField> createOrderByFields(Table table, Sort sortToUse) {

        List<OrderByField> fields = new ArrayList<>();

        for (Sort.Order order : sortToUse) {

            String propertyName = order.getProperty();
            OrderByField orderByField = OrderByField.from(table.column(propertyName).as(EntityManager.ALIAS_PREFIX + propertyName));

            if (order.getDirection() != null) {
                fields.add(order.isAscending() ? orderByField.asc() : orderByField.desc());
            } else {
                fields.add(orderByField);
            }
        }

        return fields;
    }

}
